from pathlib import Path

import numpy as np
import pandas as pd


MISSING_CODES = {7, 8, 9, 77, 88, 99, 777, 888, 999, 7777, 8888, 9999}


def clean_numeric(series: pd.Series) -> pd.Series:
    s = pd.to_numeric(series, errors="coerce")
    s = s.mask(s.isin(MISSING_CODES))
    return s


def zscore(series: pd.Series) -> pd.Series:
    s = pd.to_numeric(series, errors="coerce")
    m = s.mean()
    sd = s.std(ddof=0)
    if pd.isna(sd) or sd <= 0:
        return s * 0
    return (s - m) / sd


def main() -> None:
    root = Path(__file__).resolve().parents[1]
    out_path = root / "data_external" / "rollout_shiftshare_region_year.csv"
    out_path.parent.mkdir(parents=True, exist_ok=True)

    ess = pd.read_parquet(root / "outputs" / "ess_r8_r11_min.parquet")
    ess = ess[ess["regunit"].isin([1, 2])].copy()
    ess["region"] = ess["region"].astype(str).str.upper().str.strip()
    ess = ess[ess["region"] != "99999"].copy()

    reg = pd.read_csv(root / "data_external" / "broadband_region_year_eurostat_isoc_r_broad_h.csv")
    reg["region"] = reg["region"].astype(str).str.upper().str.strip()
    reg["year"] = pd.to_numeric(reg["year"], errors="coerce").astype("Int64")
    reg["broadband_pc_hh"] = pd.to_numeric(reg["broadband_pc_hh"], errors="coerce")

    # Baseline exposure: region broadband access in 2016 (same NUTS vintage as our region codes)
    base_year = 2016
    base = reg[reg["year"] == base_year][["region", "broadband_pc_hh"]].rename(columns={"broadband_pc_hh": "bb_access_base_2016"})
    base["cntry"] = base["region"].str[:2]
    base["bb_access_base_2016"] = pd.to_numeric(base["bb_access_base_2016"], errors="coerce")
    base = base.dropna(subset=["bb_access_base_2016"]).copy()

    # Center exposure within country (keeps identification strictly within-country)
    base["bb_access_base_2016_ct_centered"] = base["bb_access_base_2016"] - base.groupby("cntry")["bb_access_base_2016"].transform("mean")
    base["z_bb_access_base_2016_ct_centered"] = zscore(base["bb_access_base_2016_ct_centered"])

    cbs = pd.read_csv(root / "data_external" / "broadband_country_year_eurostat_isoc_cbs.csv")
    cbs["year"] = pd.to_numeric(cbs["year"], errors="coerce").astype("Int64")
    cbs["coverage_pc_hh"] = pd.to_numeric(cbs["coverage_pc_hh"], errors="coerce")
    cbs = cbs.dropna(subset=["cntry", "year", "inet_spd"]).copy()

    wide = (
        cbs.pivot_table(index=["cntry", "year"], columns="inet_spd", values="coverage_pc_hh", aggfunc="mean")
        .reset_index()
        .rename_axis(None, axis=1)
    )
    for col in ["MBPS_GT2", "MBPS_GT30", "MBPS_GT100", "GBPS_GT1"]:
        if col not in wide.columns:
            wide[col] = np.nan

    # Define shocks relative to a baseline year.
    # - For 2/30/100 Mbps tiers we can use 2016 (available in all countries).
    # - For gigabit coverage, Eurostat has no 2016 data in isoc_cbs for our sample; use 2019 as a common baseline.
    base_ct = wide[wide["year"] == base_year][["cntry", "MBPS_GT30", "MBPS_GT100"]].rename(
        columns={"MBPS_GT30": "cov30_base_2016", "MBPS_GT100": "cov100_base_2016"}
    )
    base_ct2 = wide[wide["year"] == base_year][["cntry", "MBPS_GT2"]].rename(columns={"MBPS_GT2": "cov2_base_2016"})
    wide = wide.merge(base_ct, on="cntry", how="left")
    wide = wide.merge(base_ct2, on="cntry", how="left")
    base_gbps_year = 2019
    base_gbps = wide[wide["year"] == base_gbps_year][["cntry", "GBPS_GT1"]].rename(columns={"GBPS_GT1": "gbps1_base_2019"})
    wide = wide.merge(base_gbps, on="cntry", how="left")
    wide["shock_cov2"] = wide["MBPS_GT2"] - wide["cov2_base_2016"]
    wide["shock_cov30"] = wide["MBPS_GT30"] - wide["cov30_base_2016"]
    wide["shock_cov100"] = wide["MBPS_GT100"] - wide["cov100_base_2016"]
    wide["shock_gbps1"] = wide["GBPS_GT1"] - wide["gbps1_base_2019"]

    cbt = pd.read_csv(root / "data_external" / "broadband_country_year_eurostat_isoc_cbt.csv")
    cbt["year"] = pd.to_numeric(cbt["year"], errors="coerce").astype("Int64")
    cbt["coverage_pc_hh"] = pd.to_numeric(cbt["coverage_pc_hh"], errors="coerce")
    cbt = cbt.dropna(subset=["cntry", "year", "inet_tec"]).copy()
    cbt_w = (
        cbt.pivot_table(index=["cntry", "year"], columns="inet_tec", values="coverage_pc_hh", aggfunc="mean")
        .reset_index()
        .rename_axis(None, axis=1)
    )
    for col in ["FTTP", "NGA", "VHCN_FX", "DOCSIS3_1"]:
        if col not in cbt_w.columns:
            cbt_w[col] = np.nan

    # FTTP + NGA have 2016 coverage for all countries in our sample.
    fttp_base = cbt_w[cbt_w["year"] == base_year][["cntry", "FTTP"]].rename(columns={"FTTP": "fttp_base_2016"})
    nga_base = cbt_w[cbt_w["year"] == base_year][["cntry", "NGA"]].rename(columns={"NGA": "nga_base_2016"})
    cbt_w = cbt_w.merge(fttp_base, on="cntry", how="left")
    cbt_w = cbt_w.merge(nga_base, on="cntry", how="left")
    cbt_w["shock_fttp"] = cbt_w["FTTP"] - cbt_w["fttp_base_2016"]
    cbt_w["shock_nga"] = cbt_w["NGA"] - cbt_w["nga_base_2016"]

    # VHCN and DOCSIS3.1 start in 2019 for our sample; use 2019 baseline so shocks are defined for 2021/2024.
    base_cbt_year = 2019
    vhcn_base = cbt_w[cbt_w["year"] == base_cbt_year][["cntry", "VHCN_FX"]].rename(columns={"VHCN_FX": "vhcn_fx_base_2019"})
    docsis_base = cbt_w[cbt_w["year"] == base_cbt_year][["cntry", "DOCSIS3_1"]].rename(
        columns={"DOCSIS3_1": "docsis31_base_2019"}
    )
    cbt_w = cbt_w.merge(vhcn_base, on="cntry", how="left")
    cbt_w = cbt_w.merge(docsis_base, on="cntry", how="left")
    cbt_w["shock_vhcn_fx"] = cbt_w["VHCN_FX"] - cbt_w["vhcn_fx_base_2019"]
    cbt_w["shock_docsis31"] = cbt_w["DOCSIS3_1"] - cbt_w["docsis31_base_2019"]

    # Build region-year panel for ESS years, excluding 2016 so baseline exposure is predetermined in estimation window.
    years = sorted(int(y) for y in ess["survey_year"].dropna().unique().tolist())
    years = [y for y in years if y > base_year]
    regions = sorted(base["region"].unique().tolist())
    panel = pd.MultiIndex.from_product([regions, years], names=["region", "survey_year"]).to_frame(index=False)
    panel["cntry"] = panel["region"].str[:2]
    panel = panel.merge(base[["region", "cntry", "z_bb_access_base_2016_ct_centered"]], on=["region", "cntry"], how="left")
    panel = panel.merge(
        wide[["cntry", "year", "shock_cov2", "shock_cov30", "shock_cov100", "shock_gbps1"]].rename(columns={"year": "survey_year"}),
        on=["cntry", "survey_year"],
        how="left",
    )
    panel = panel.merge(
        cbt_w[["cntry", "year", "shock_fttp", "shock_nga", "shock_vhcn_fx", "shock_docsis31"]].rename(columns={"year": "survey_year"}),
        on=["cntry", "survey_year"],
        how="left",
    )

    panel["ss_cov2"] = panel["z_bb_access_base_2016_ct_centered"] * panel["shock_cov2"]
    panel["ss_cov30"] = panel["z_bb_access_base_2016_ct_centered"] * panel["shock_cov30"]
    panel["ss_cov100"] = panel["z_bb_access_base_2016_ct_centered"] * panel["shock_cov100"]
    panel["ss_gbps1"] = panel["z_bb_access_base_2016_ct_centered"] * panel["shock_gbps1"]
    panel["ss_fttp"] = panel["z_bb_access_base_2016_ct_centered"] * panel["shock_fttp"]
    panel["ss_nga"] = panel["z_bb_access_base_2016_ct_centered"] * panel["shock_nga"]
    panel["ss_vhcn_fx"] = panel["z_bb_access_base_2016_ct_centered"] * panel["shock_vhcn_fx"]
    panel["ss_docsis31"] = panel["z_bb_access_base_2016_ct_centered"] * panel["shock_docsis31"]

    panel["z_ss_cov2"] = zscore(panel["ss_cov2"])
    panel["z_ss_cov30"] = zscore(panel["ss_cov30"])
    panel["z_ss_cov100"] = zscore(panel["ss_cov100"])
    panel["z_ss_gbps1"] = zscore(panel["ss_gbps1"])
    panel["z_ss_fttp"] = zscore(panel["ss_fttp"])
    panel["z_ss_nga"] = zscore(panel["ss_nga"])
    panel["z_ss_vhcn_fx"] = zscore(panel["ss_vhcn_fx"])
    panel["z_ss_docsis31"] = zscore(panel["ss_docsis31"])

    panel["source"] = (
        "Shift-share: region baseline broadband access (isoc_r_broad_h 2016) "
        "× country-year coverage shocks (Eurostat isoc_cbs/isoc_cbt; 2016 or 2019 baseline depending on tier)"
    )
    panel = panel.sort_values(["region", "survey_year"])
    panel.to_csv(out_path, index=False, encoding="utf-8")
    print(f"Wrote: {out_path} rows={len(panel):,} regions={panel['region'].nunique()} years={panel['survey_year'].nunique()}")


if __name__ == "__main__":
    main()
